/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package zookeeper

import (
	"fmt"
	"net/url"
	"path"
	"sync"
)

import (
	"github.com/dubbogo/go-zookeeper/zk"

	gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
	"github.com/dubbogo/gost/log/logger"

	perrors "github.com/pkg/errors"
)

import (
	"dubbo.apache.org/dubbo-go/v3/common"
	"dubbo.apache.org/dubbo-go/v3/common/constant"
	"dubbo.apache.org/dubbo-go/v3/common/extension"
	"dubbo.apache.org/dubbo-go/v3/registry"
	"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper"
)

func init() {
	extension.SetRegistry("zookeeper", newZkRegistry)
}

type zkRegistry struct {
	registry.BaseRegistry
	client       *gxzookeeper.ZookeeperClient
	listenerLock sync.Mutex
	listener     *zookeeper.ZkEventListener
	dataListener *RegistryDataListener
	cltLock      sync.Mutex
	zkPath       map[string]int // key = protocol://ip:port/interface
}

func newZkRegistry(url *common.URL) (registry.Registry, error) {
	var (
		err error
		r   *zkRegistry
	)
	r = &zkRegistry{
		zkPath: make(map[string]int),
	}
	logger.Infof("[Zookeeper Registry] New zookeeper registry with url %+v", url.ToMap())
	r.InitBaseRegistry(url, r)

	err = zookeeper.ValidateZookeeperClient(r, url.Location)
	if err != nil {
		return nil, err
	}

	r.WaitGroup().Add(1)
	go zookeeper.HandleClientRestart(r)

	r.listener = zookeeper.NewZkEventListener(r.client)

	r.dataListener = NewRegistryDataListener()

	return r, nil
}

// Options defines optional parameters to construct a zkRegistry (used in tests/mocks).
type Options struct {
}

// Option mutates Options when constructing a zkRegistry (functional options pattern).
type Option func(*Options)

// InitListeners initializes listeners of zookeeper registry center
func (r *zkRegistry) InitListeners() {
	r.listener = zookeeper.NewZkEventListener(r.client)
	newDataListener := NewRegistryDataListener()
	// should recover if dataListener isn't nil before
	if r.dataListener != nil {
		// close all old listener
		oldDataListener := r.dataListener
		oldDataListener.mutex.Lock()
		defer oldDataListener.mutex.Unlock()
		r.dataListener.closed = true
		recovered := r.dataListener.subscribed
		if len(recovered) > 0 {
			// recover all subscribed url
			for _, oldListener := range recovered {
				var (
					regConfigListener *RegistryConfigurationListener
					ok                bool
				)
				if regConfigListener, ok = oldListener.(*RegistryConfigurationListener); ok {
					regConfigListener.Close()
				}
				newDataListener.SubscribeURL(regConfigListener.subscribeURL, NewRegistryConfigurationListener(r.client, r, regConfigListener.subscribeURL))
				go r.listener.ListenServiceEvent(regConfigListener.subscribeURL, fmt.Sprintf("/%s/%s/"+constant.DefaultCategory, r.GetParam(constant.RegistryGroupKey, "dubbo"), url.QueryEscape(regConfigListener.subscribeURL.Service())), newDataListener)

			}
		}
	}
	r.dataListener = newDataListener
}

// CreatePath creates the path in the registry center of zookeeper
func (r *zkRegistry) CreatePath(path string) error {
	err := r.ZkClient().Create(path)
	if err != nil && err != zk.ErrNodeExists {
		return err
	}
	return nil
}

// DoRegister actually do the register job in the registry center of zookeeper
func (r *zkRegistry) DoRegister(root string, node string) error {
	return r.registerTempZookeeperNode(root, node)
}

func (r *zkRegistry) DoUnregister(root string, node string) error {
	r.cltLock.Lock()
	defer r.cltLock.Unlock()
	if !r.ZkClient().ZkConnValid() {
		return perrors.Errorf("zk client is not valid.")
	}
	return r.ZkClient().Delete(path.Join(root, node))
}

// DoSubscribe actually subscribes the provider URL
func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) {
	return r.getListener(conf)
}

func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
	return r.getCloseListener(conf)
}

// LoadSubscribeInstances load subscribe instance
func (r *zkRegistry) LoadSubscribeInstances(_ *common.URL, _ registry.NotifyListener) error {
	return nil
}

// CloseAndNilClient closes listeners and clear client
func (r *zkRegistry) CloseAndNilClient() {
	r.listener.Close()
	r.client.Close()
	r.client = nil
}

// ZkClient returns the underlying zookeeper client.
func (r *zkRegistry) ZkClient() *gxzookeeper.ZookeeperClient {
	return r.client
}

// SetZkClient sets the underlying zookeeper client.
func (r *zkRegistry) SetZkClient(client *gxzookeeper.ZookeeperClient) {
	r.client = client
}

// ZkClientLock exposes the lock guarding client changes.
func (r *zkRegistry) ZkClientLock() *sync.Mutex {
	return &r.cltLock
}

// CloseListener closes listeners
func (r *zkRegistry) CloseListener() {
	if r.dataListener != nil {
		r.dataListener.Close()
	}
}

func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
	var (
		err    error
		zkPath string
	)

	r.cltLock.Lock()
	defer r.cltLock.Unlock()
	if r.client == nil {
		return perrors.WithStack(perrors.New("zk client already been closed"))
	}
	logger.Infof("[Zookeeper Registry] Registry instance with root = %s, node = %s", root, node)
	err = r.client.Create(root)
	if err != nil && err != zk.ErrNodeExists {
		logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err))
		return perrors.WithStack(err)
	}

	// Try to register the node
	zkPath, err = r.client.RegisterTemp(root, node)
	if err == nil {
		return nil
	}

	// Maybe the node did exist, then we need to delete it first and recreate it
	if perrors.Cause(err) == zk.ErrNodeExists {
		if err = r.client.Delete(zkPath); err == nil {
			_, err = r.client.RegisterTemp(root, node)
		}

		if err == nil {
			return nil
		}
	}

	logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
	return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
}

func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) {
	var zkListener *RegistryConfigurationListener
	dataListener := r.dataListener
	ttl := r.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL)
	conf.SetParam(constant.RegistryTTLKey, ttl)
	dataListener.mutex.Lock()
	defer dataListener.mutex.Unlock()
	if r.dataListener.subscribed[conf.ServiceKey()] != nil {
		zkListener, _ = r.dataListener.subscribed[conf.ServiceKey()].(*RegistryConfigurationListener)
		if zkListener != nil {
			r.listenerLock.Lock()
			defer r.listenerLock.Unlock()
			if zkListener.isClosed {
				return nil, perrors.New("configListener already been closed")
			} else {
				return zkListener, nil
			}
		}
	}

	zkListener = NewRegistryConfigurationListener(r.client, r, conf)
	if r.listener == nil {
		r.cltLock.Lock()
		client := r.client
		r.cltLock.Unlock()
		if client == nil {
			return nil, perrors.New("zk connection broken")
		}

		// new client & listener
		listener := zookeeper.NewZkEventListener(r.client)

		r.listenerLock.Lock()
		r.listener = listener
		r.listenerLock.Unlock()
	}

	// Interested register to dataconfig.
	r.dataListener.SubscribeURL(conf, zkListener)

	go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/%s/%s/"+constant.DefaultCategory, r.GetParam(constant.RegistryGroupKey, "dubbo"), url.QueryEscape(conf.Service())), r.dataListener)

	return zkListener, nil
}

func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationListener, error) {
	var zkListener *RegistryConfigurationListener
	r.dataListener.mutex.Lock()
	configurationListener := r.dataListener.subscribed[conf.ServiceKey()]
	if configurationListener != nil {
		zkListener, _ = configurationListener.(*RegistryConfigurationListener)
		if zkListener != nil && zkListener.isClosed {
			r.dataListener.mutex.Unlock()
			return nil, perrors.New(fmt.Sprintf("configListener for service %s has already been closed", conf.ServiceKey()))
		}
	}

	if configurationListener := r.dataListener.UnSubscribeURL(conf); configurationListener != nil {
		switch v := configurationListener.(type) {
		case *RegistryConfigurationListener:
			if v != nil {
				zkListener = v
			}
		}
	}
	r.dataListener.mutex.Unlock()

	if r.listener == nil {
		return nil, perrors.New("Zookeeper event listener is null, can not close.")
	}

	return zkListener, nil
}
